package com.wll.springbootmqttemqxdemo.other.client;

import com.wll.springbootmqttemqxdemo.other.server.PushCallback;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 订阅消费 消费者
 *
 * @author wanglulu
 * @date 2019/11/15$ 17:43$
 */
public class Client {

    public static void main(String[] args) {
        String topic = "demo/topics";
        String content = "Message from MqttPublishSample";
        int qos = 1;
        String broker = "tcp://localhost:1883";
        String clientId = "client";

        for (int i = 0; i < 10; i++) {
            batchClient(topic, content, qos, broker, clientId + i);
        }

    }

    /**
     * @param topic
     * @param content
     * @param qos
     * @param broker
     * @param clientId
     */
    public static void batchClient(String topic, String content, int qos, String broker, String clientId) {
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            // mqtt客户端订阅消费
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            // mqtt连接参数
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: " + broker);
            // 连接
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: " + content);
            // 消息
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            // 根据主题订阅
            sampleClient.subscribe(topic, qos);
            // 消息监听
            sampleClient.setCallback(new PushCallback());
//            sampleClient.setCallback(new MqttCallback() {
//                @Override
//                public void connectionLost(Throwable throwable) {
//                }
//                // 消息监听处理
//                @Override
//                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//                    System.out.println("topic: --> "+topic);
//                    System.out.println("mqttMessage: -->"+mqttMessage);
//                    System.out.println("*******************");
//                    System.out.println("/////////");
//                }
//                @Override
//                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//                }
//            });
            System.out.println(clientId + "用户订阅成功");
            /** 监听完成 断开销毁
             System.out.println("Message published");
             sampleClient.disconnect();
             System.out.println("Disconnected");
             System.exit(0);
             */
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

}
